K-Nearest Neighbors Time Series Regressor (KNN-TS) — analog forecasting from scratch#
A KNN time-series regressor predicts by finding similar past windows (“analogs”) and averaging what happened next.
Why it’s useful:
non-parametric baseline (no parametric trend/seasonality assumptions)
can model nonlinear patterns if they repeat in history
naturally provides a distribution of plausible next values via neighbor targets (confidence intuition)
Goals#
Implement
KNeighborsTimeSeriesRegressor([...])in NumPy (brute-force neighbors)Show the key design choices: windowing, distance metric, normalization, weighting
Plot with Plotly:
the raw series
1-step ahead forecasts with an empirical prediction band from neighbors
a “fan” view to build confidence intuition
1) Time series forecasting as supervised regression#
Given a univariate series \(y_0, y_1, \dots, y_{T-1}\), choose:
window length \(L\) (how many past points to compare)
forecast horizon \(h\) (predict how far ahead)
Define the feature window and target: $\( \mathbf{x}_t = [y_{t-L},\; y_{t-L+1},\; \dots,\; y_{t-1}]^\top \in \mathbb{R}^L, \qquad y^{(\mathrm{target})}_t = y_{t+h-1}. \)$
for \(t = L, L+1, \dots, T-h\).
This builds a supervised dataset \((\mathbf{x}_t, y^{(\mathrm{target})}_t)\).
2) KNN regression on windows#
Given a query window \(\mathbf{x}\), compute distances to all training windows \(\{\mathbf{x}_i\}_{i=1}^n\): $\( d_i = d(\mathbf{x}, \mathbf{x}_i). \)$
Let \(\mathcal{N}_k(\mathbf{x})\) be the indices of the \(k\) smallest distances.
Uniform-weight KNN regressor#
Distance-weighted KNN regressor#
A common choice is inverse-distance weights: $\( w_i = \frac{1}{d_i + \varepsilon}, \qquad \hat{y}(\mathbf{x}) = \frac{\sum_{i\in\mathcal{N}_k(\mathbf{x})} w_i y_i}{\sum_{i\in\mathcal{N}_k(\mathbf{x})} w_i}. \)$
Distance choice matters#
Euclidean on raw windows matches level + shape.
Z-normalized Euclidean (normalize each window to mean 0, std 1) matches shape more than level.
DTW (dynamic time warping) matches shapes that may be slightly time-warped.
This notebook implements Euclidean (fast, vectorized) and DTW (slower, but instructive).
import numpy as np
import pandas as pd
import plotly
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio
from sklearn.metrics import mean_absolute_error, mean_squared_error
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"
rng = np.random.default_rng(7)
print("numpy:", np.__version__)
print("pandas:", pd.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
plotly: 6.5.2
3) Synthetic series with repeating structure#
KNN works well when patterns repeat. We’ll simulate a series with trend + multiple seasonalities + noise.
n = 520
t = np.arange(n)
dates = pd.date_range("2020-01-01", periods=n, freq="D")
trend = 0.03 * t
season_30 = 2.2 * np.sin(2 * np.pi * t / 30)
season_7 = 0.9 * np.sin(2 * np.pi * t / 7)
noise = rng.normal(0.0, 0.9, size=n)
# A few shock events
shocks = np.zeros(n)
shock_idx = rng.choice(np.arange(40, n - 40), size=10, replace=False)
shocks[shock_idx] = rng.normal(0.0, 4.0, size=len(shock_idx))
y = 50 + trend + season_30 + season_7 + noise + shocks
fig = go.Figure()
fig.add_trace(go.Scatter(x=dates, y=y, mode="lines", name="y"))
fig.update_layout(height=350, title="Synthetic series (trend + seasonality + noise)", xaxis_title="Time", yaxis_title="Value")
fig.show()
def make_supervised_windows(
y: np.ndarray,
*,
window_length: int,
horizon: int = 1,
stride: int = 1,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Turn a 1D series into (X, y_target, t_target).
X[i] is a length-L window, and y_target[i] is the value horizon steps after the window end.
t_target[i] is the time index of y_target[i] in the original series.
"""
y = np.asarray(y, dtype=float)
L = int(window_length)
h = int(horizon)
s = int(stride)
if L < 2:
raise ValueError("window_length must be >= 2")
if h < 1:
raise ValueError("horizon must be >= 1")
if s < 1:
raise ValueError("stride must be >= 1")
last_start = y.size - L - h
if last_start < 0:
raise ValueError("y is too short for the chosen window_length/horizon")
starts = np.arange(0, last_start + 1, s)
n_samples = len(starts)
X = np.empty((n_samples, L), dtype=float)
y_target = np.empty(n_samples, dtype=float)
t_target = np.empty(n_samples, dtype=int)
for i, start in enumerate(starts):
end = start + L
target_idx = end + h - 1
X[i] = y[start:end]
y_target[i] = y[target_idx]
t_target[i] = target_idx
return X, y_target, t_target
L = 60
horizon = 1
X, y_next, t_next = make_supervised_windows(y, window_length=L, horizon=horizon, stride=1)
h_test = 120
X_train, y_train = X[:-h_test], y_next[:-h_test]
X_test, y_test = X[-h_test:], y_next[-h_test:]
t_test = t_next[-h_test:]
print("X_train:", X_train.shape, "| X_test:", X_test.shape)
print("y_train:", y_train.shape, "| y_test:", y_test.shape)
X_train: (340, 60) | X_test: (120, 60)
y_train: (340,) | y_test: (120,)
def zscore_per_series(X: np.ndarray, *, eps: float = 1e-8) -> np.ndarray:
"""Z-normalize each sample (and each dim) across time."""
X = np.asarray(X, dtype=float)
if X.ndim == 2:
mu = X.mean(axis=1, keepdims=True)
sd = X.std(axis=1, keepdims=True)
return (X - mu) / (sd + eps)
if X.ndim == 3:
mu = X.mean(axis=2, keepdims=True)
sd = X.std(axis=2, keepdims=True)
return (X - mu) / (sd + eps)
raise ValueError("X must be 2D or 3D")
def pairwise_euclidean_distances(X_query: np.ndarray, X_train: np.ndarray) -> np.ndarray:
"""Fast pairwise Euclidean distances for 2D arrays (n_query, p) vs (n_train, p)."""
X_query = np.asarray(X_query, dtype=float)
X_train = np.asarray(X_train, dtype=float)
if X_query.ndim != 2 or X_train.ndim != 2:
raise ValueError("X_query and X_train must be 2D arrays")
if X_query.shape[1] != X_train.shape[1]:
raise ValueError("X_query and X_train must have the same number of features")
q_sq = np.sum(X_query**2, axis=1, keepdims=True)
t_sq = np.sum(X_train**2, axis=1)
d2 = q_sq + t_sq[None, :] - 2.0 * (X_query @ X_train.T)
d2 = np.maximum(d2, 0.0)
return np.sqrt(d2)
def dtw_distance_1d(a: np.ndarray, b: np.ndarray, *, window: int | None = None) -> float:
"""DTW distance between two 1D sequences using squared-cost DP (optionally banded)."""
a = np.asarray(a, dtype=float)
b = np.asarray(b, dtype=float)
n = a.size
m = b.size
if window is None:
window = max(n, m)
window = int(window)
window = max(window, abs(n - m))
D = np.full((n + 1, m + 1), np.inf)
D[0, 0] = 0.0
for i in range(1, n + 1):
j_start = max(1, i - window)
j_end = min(m, i + window)
for j in range(j_start, j_end + 1):
cost = (a[i - 1] - b[j - 1]) ** 2
D[i, j] = cost + min(D[i - 1, j], D[i, j - 1], D[i - 1, j - 1])
return float(np.sqrt(D[n, m]))
def pairwise_dtw_distances(X_query: np.ndarray, X_train: np.ndarray, *, window: int | None = None) -> np.ndarray:
X_query = np.asarray(X_query, dtype=float)
X_train = np.asarray(X_train, dtype=float)
if X_query.ndim != 2 or X_train.ndim != 2:
raise ValueError("X_query and X_train must be 2D arrays")
if X_query.shape[1] != X_train.shape[1]:
raise ValueError("DTW here assumes equal-length windows")
out = np.empty((X_query.shape[0], X_train.shape[0]), dtype=float)
for i in range(X_query.shape[0]):
for j in range(X_train.shape[0]):
out[i, j] = dtw_distance_1d(X_query[i], X_train[j], window=window)
return out
def weighted_quantile_1d(values: np.ndarray, quantiles: np.ndarray, *, weights: np.ndarray) -> np.ndarray:
"""Weighted quantiles for 1D values. quantiles in [0,1]."""
values = np.asarray(values, dtype=float)
weights = np.asarray(weights, dtype=float)
quantiles = np.asarray(quantiles, dtype=float)
if values.ndim != 1 or weights.ndim != 1:
raise ValueError("values and weights must be 1D")
if values.size != weights.size:
raise ValueError("values and weights must have the same length")
if np.any(quantiles < 0) or np.any(quantiles > 1):
raise ValueError("quantiles must be in [0,1]")
sorter = np.argsort(values)
v = values[sorter]
w = weights[sorter]
w_sum = np.sum(w)
if w_sum <= 0:
raise ValueError("sum of weights must be > 0")
cdf = np.cumsum(w) / w_sum
return np.interp(quantiles, cdf, v)
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
"""Accept (n,m) or (n,d,m) and return (n,d,m)."""
X = np.asarray(X, dtype=float)
if X.ndim == 2:
return X[:, None, :]
if X.ndim == 3:
return X
raise ValueError("X must have shape (n,m) or (n,d,m)")
class KNeighborsTimeSeriesRegressor:
"""Brute-force KNN regressor for fixed-length time-series windows.
Parameters
----------
n_neighbors : int
Number of neighbors (k).
weights : {"uniform","distance"}
Uniform average or inverse-distance weighting.
metric : {"euclidean","dtw"}
Distance metric between windows.
normalize : {None,"zscore","global"}
- None: use raw windows
- "zscore": normalize each window to mean 0 and std 1 (shape matching)
- "global": normalize using training mean/std (level + scale normalization)
dtw_window : int | None
Sakoe-Chiba band size for DTW (None -> unconstrained).
eps : float
Small constant for inverse-distance weights.
"""
def __init__(
self,
*,
n_neighbors: int = 10,
weights: str = "uniform",
metric: str = "euclidean",
normalize: str | None = "zscore",
dtw_window: int | None = None,
eps: float = 1e-8,
):
self.n_neighbors = int(n_neighbors)
self.weights = str(weights)
self.metric = str(metric)
self.normalize = normalize
self.dtw_window = dtw_window
self.eps = float(eps)
def fit(self, X, y):
X3 = _as_3d_panel(X)
y = np.asarray(y, dtype=float).reshape(-1)
if X3.shape[0] != y.size:
raise ValueError("X and y must have the same number of samples")
if self.n_neighbors < 1:
raise ValueError("n_neighbors must be >= 1")
if self.weights not in {"uniform", "distance"}:
raise ValueError("weights must be 'uniform' or 'distance'")
if self.metric not in {"euclidean", "dtw"}:
raise ValueError("metric must be 'euclidean' or 'dtw'")
if self.normalize not in {None, "zscore", "global"}:
raise ValueError("normalize must be None, 'zscore', or 'global'")
self.n_train_ = int(X3.shape[0])
self.n_dims_ = int(X3.shape[1])
self.n_timepoints_ = int(X3.shape[2])
self.X_train_raw_ = X3.copy()
self.y_train_ = y.copy()
if self.normalize == "global":
mu = X3.mean(axis=(0, 2), keepdims=True)
sd = X3.std(axis=(0, 2), keepdims=True)
self._mu_ = mu
self._sd_ = sd
self.X_train_ = (X3 - mu) / (sd + self.eps)
elif self.normalize == "zscore":
self.X_train_ = zscore_per_series(X3, eps=self.eps)
else:
self.X_train_ = X3
return self
def _transform(self, X) -> np.ndarray:
X3 = _as_3d_panel(X)
if X3.shape[1] != self.n_dims_ or X3.shape[2] != self.n_timepoints_:
raise ValueError(
f"X must have shape (n,{self.n_dims_},{self.n_timepoints_}) (or (n,{self.n_timepoints_}) for univariate)"
)
if self.normalize == "global":
return (X3 - self._mu_) / (self._sd_ + self.eps)
if self.normalize == "zscore":
return zscore_per_series(X3, eps=self.eps)
return X3
def _pairwise_distances(self, X_query) -> np.ndarray:
Q3 = self._transform(X_query)
T3 = self.X_train_
if self.metric == "euclidean":
Q2 = Q3.reshape(Q3.shape[0], -1)
T2 = T3.reshape(T3.shape[0], -1)
return pairwise_euclidean_distances(Q2, T2)
# DTW implementation here is univariate for simplicity.
if self.n_dims_ != 1:
raise ValueError("DTW metric currently supports only univariate windows")
Q2 = Q3[:, 0, :]
T2 = T3[:, 0, :]
return pairwise_dtw_distances(Q2, T2, window=self.dtw_window)
def kneighbors(self, X_query) -> tuple[np.ndarray, np.ndarray]:
dists = self._pairwise_distances(X_query)
k = min(self.n_neighbors, self.n_train_)
# argpartition gives k smallest in arbitrary order
idx = np.argpartition(dists, kth=k - 1, axis=1)[:, :k]
row = np.arange(dists.shape[0])[:, None]
dist_k = dists[row, idx]
# sort neighbors by distance
order = np.argsort(dist_k, axis=1)
idx_sorted = idx[row, order]
dist_sorted = dist_k[row, order]
return dist_sorted, idx_sorted
def predict(self, X_query) -> np.ndarray:
dist, idx = self.kneighbors(X_query)
yk = self.y_train_[idx]
if self.weights == "uniform":
return yk.mean(axis=1)
w = 1.0 / (dist + self.eps)
w = w / np.sum(w, axis=1, keepdims=True)
return np.sum(w * yk, axis=1)
def neighbor_targets(self, X_query) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Return (distances, indices, neighbor y-values) for each query window."""
dist, idx = self.kneighbors(X_query)
return dist, idx, self.y_train_[idx]
model = KNeighborsTimeSeriesRegressor(
n_neighbors=25,
weights="distance",
metric="euclidean",
normalize="zscore",
)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
# Empirical "prediction band" from the neighbor targets.
dist, idx, y_nei = model.neighbor_targets(X_test)
if model.weights == "distance":
w = 1.0 / (dist + model.eps)
q = weighted_quantile_1d
qs = np.array([0.05, 0.50, 0.95])
q_mat = np.vstack([q(y_nei[i], qs, weights=w[i]) for i in range(y_nei.shape[0])])
q05, q50, q95 = q_mat[:, 0], q_mat[:, 1], q_mat[:, 2]
else:
q05, q50, q95 = np.quantile(y_nei, [0.05, 0.50, 0.95], axis=1)
mae = mean_absolute_error(y_test, y_pred)
rmse = float(np.sqrt(mean_squared_error(y_test, y_pred)))
print(f"MAE: {mae:.3f}")
print(f"RMSE: {rmse:.3f}")
MAE: 6.285
RMSE: 6.434
test_dates = dates[t_test]
fig = go.Figure()
fig.add_trace(go.Scatter(x=dates, y=y, mode="lines", name="y", line=dict(color="#444")))
# 90% band from neighbor targets
fig.add_trace(
go.Scatter(x=test_dates, y=q95, mode="lines", line=dict(width=0), showlegend=False)
)
fig.add_trace(
go.Scatter(
x=test_dates,
y=q05,
mode="lines",
line=dict(width=0),
fill="tonexty",
fillcolor="rgba(31,119,180,0.20)",
name="90% neighbor band",
)
)
fig.add_trace(go.Scatter(x=test_dates, y=y_test, mode="lines", name="Actual (test)", line=dict(color="#000")))
fig.add_trace(go.Scatter(x=test_dates, y=y_pred, mode="lines", name="KNN forecast", line=dict(color="#1f77b4")))
fig.update_layout(
height=450,
title="1-step ahead forecast via KNN windows (with neighbor-based band)",
xaxis_title="Time",
yaxis_title="Value",
)
fig.show()
4) Confidence intuition: the neighbor target distribution#
KNN doesn’t give a parametric confidence interval, but it naturally gives a set of neighbor targets.
If the nearest neighbors all lead to similar next-step values, the forecast is “confident”. If neighbor targets vary widely, you should expect more uncertainty.
i = 35 # pick a test point
xq = X_test[i : i + 1]
y_true = float(y_test[i])
y_hat = float(y_pred[i])
dist1, idx1, yk = model.neighbor_targets(xq)
dist1 = dist1[0]
idx1 = idx1[0]
yk = yk[0]
fig = make_subplots(rows=1, cols=2, column_widths=[0.55, 0.45], subplot_titles=("Query vs nearest windows", "Neighbor targets"))
# Left: overlay neighbor windows (aligned by relative time within the window)
rel_t = np.arange(L)
for j in range(min(12, len(idx1))):
fig.add_trace(
go.Scatter(
x=rel_t,
y=X_train[idx1[j]],
mode="lines",
line=dict(color="rgba(31,119,180,0.18)", width=1),
showlegend=False,
),
row=1,
col=1,
)
fig.add_trace(
go.Scatter(x=rel_t, y=xq[0], mode="lines", name="Query window", line=dict(color="#000", width=3)),
row=1,
col=1,
)
# Right: histogram of neighbor targets
fig.add_trace(go.Histogram(x=yk, nbinsx=18, name="Neighbor y", marker=dict(color="rgba(31,119,180,0.65)")), row=1, col=2)
for xval, name, color in [
(y_true, "Actual", "#000"),
(y_hat, "Pred", "#1f77b4"),
(float(np.median(yk)), "Median(nei)", "#ff7f0e"),
]:
fig.add_vline(x=xval, line_width=2, line_dash="dash", line_color=color, row=1, col=2)
fig.update_xaxes(title_text="Window time index", row=1, col=1)
fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_xaxes(title_text="Next value", row=1, col=2)
fig.update_yaxes(title_text="Count", row=1, col=2)
fig.update_layout(height=420, title="KNN confidence intuition via neighbor dispersion")
fig.show()
5) Forecasting beyond the observed data (recursive)#
To forecast multiple steps ahead, one simple approach is recursive forecasting:
predict the next point from the last window,
append it to the history,
repeat.
We’ll also propagate a simple fan by taking quantiles of the neighbor targets at each step (heuristic, but visually useful).
model_full = KNeighborsTimeSeriesRegressor(
n_neighbors=25,
weights="distance",
metric="euclidean",
normalize="zscore",
)
# Fit on all available windows.
X_all, y_all, t_all = make_supervised_windows(y, window_length=L, horizon=1, stride=1)
model_full.fit(X_all, y_all)
def recursive_forecast_with_band(
model: KNeighborsTimeSeriesRegressor,
y_hist: np.ndarray,
*,
window_length: int,
steps: int,
qs=(0.05, 0.50, 0.95),
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
y_hist = np.asarray(y_hist, dtype=float)
L = int(window_length)
out = np.zeros((steps, 3), dtype=float)
buf = y_hist.tolist()
for s in range(steps):
xq = np.asarray(buf[-L:], dtype=float)[None, :]
dist, _, yk = model.neighbor_targets(xq)
dist = dist[0]
yk = yk[0]
if model.weights == "distance":
w = 1.0 / (dist + model.eps)
out[s] = weighted_quantile_1d(yk, np.asarray(qs), weights=w)
else:
out[s] = np.quantile(yk, qs)
# Use the median as the point to recurse on (robust to outliers)
buf.append(float(out[s, 1]))
q05, q50, q95 = out[:, 0], out[:, 1], out[:, 2]
return q50, q05, q95
n_future = 60
fc_med, fc_lo, fc_hi = recursive_forecast_with_band(model_full, y, window_length=L, steps=n_future)
future_dates = pd.date_range(dates[-1] + pd.Timedelta(days=1), periods=n_future, freq="D")
fig = go.Figure()
fig.add_trace(go.Scatter(x=dates, y=y, mode="lines", name="History", line=dict(color="#444")))
fig.add_trace(go.Scatter(x=future_dates, y=fc_hi, mode="lines", line=dict(width=0), showlegend=False))
fig.add_trace(
go.Scatter(
x=future_dates,
y=fc_lo,
mode="lines",
line=dict(width=0),
fill="tonexty",
fillcolor="rgba(31,119,180,0.20)",
name="90% band (heuristic)",
)
)
fig.add_trace(go.Scatter(x=future_dates, y=fc_med, mode="lines", name="Forecast (median)", line=dict(color="#1f77b4", width=3)))
fig.update_layout(height=450, title="Recursive KNN forecast with fan-style band", xaxis_title="Time", yaxis_title="Value")
fig.show()
Practical notes + typical use cases#
When KNN-TS is a good fit#
Analog forecasting: the future behaves like the past when similar patterns occur
Small data: you want a strong baseline without heavy training
Nonlinear but repetitive dynamics
You want an easy-to-explain model: “these past windows looked most like today”
Pitfalls#
Scaling/normalization is crucial: distances are the model.
Large windows increase dimensionality → neighbors become less meaningful (curse of dimensionality).
Brute-force KNN is \(O(n\,L)\) per query for Euclidean (and slower for DTW). For large \(n\), you need approximations / indexing.
Non-stationarity (level shifts, changing seasonality) can break “similarity” unless you normalize or add features.
Extensions#
Use DTW for more flexible shape matching (slower).
Add calendar / exogenous features by concatenating them to the window vector.
Predict multi-step horizons directly by changing the target to \(y_{t+h}\).